有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

如何在Java配置中正确定义Spring集成JpaOutboundGateway?

我试图用JavaConfig替换Spring集成XML配置,但我无法正确定义JpaOutboundGateway。在XML中,我有以下bean定义:

<int-jpa:outbound-channel-adapter id="saveOffer" entity-manager-factory="entityManagerFactory" channel="channel5"
                                  flush-size="100" entity-class="JobOffer">
    <int-jpa:transactional transaction-manager="transactionManager"/>
</int-jpa:outbound-channel-adapter>

而且效果很好。但当我试图将其移动到java配置bean时,我不知道如何定义事务管理。。。我仍然收到与事务相关的错误

@Bean
@ServiceActivator(inputChannel="channel5")
public JpaOutboundGatewayFactoryBean save() throws Exception {

    JpaExecutor executor = new JpaExecutor(entityManager);
    executor.setFlushSize(10);
    executor.setEntityClass(JobOffer.class);

    JpaOutboundGatewayFactoryBean factoryBean = new JpaOutboundGatewayFactoryBean();
    factoryBean.setJpaExecutor(executor);
    factoryBean.setProducesReply(false);

    return factoryBean;
}

我试图在JpaExecutor中使用EntityManager和EntityManager工厂。但在这两种情况下都会引发异常:

2016-09-03 09:20:42.440 ERROR o.s.integration.handler.LoggingHandler  : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jpa.outbound.JpaOutboundGateway@1fc306d3]; nested exception is javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'merge' call
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:129)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'merge' call
    at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:282)
    at com.sun.proxy.$Proxy121.merge(Unknown Source)
    at org.springframework.integration.jpa.core.DefaultJpaOperations.persistOrMerge(DefaultJpaOperations.java:248)
    at org.springframework.integration.jpa.core.DefaultJpaOperations.merge(DefaultJpaOperations.java:215)
    at org.springframework.integration.jpa.core.JpaExecutor.executeOutboundJpaOperation(JpaExecutor.java:253)
    at org.springframework.integration.jpa.outbound.JpaOutboundGateway.handleRequestMessage(JpaOutboundGateway.java:81)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    ... 19 more

共 (2) 个答案

  1. # 1 楼答案

    这是需要连接的更复杂的端点之一;我们需要在参考手册中添加一个Java配置示例;我开了一家JIRA Issue

    你可以随时查看the XML parser以了解需要什么

    特别是:

    1. JpaExecutor应该定义为@Bean
    2. 您需要将事务建议添加到适配器的txAdviceChain
    3. 您可以查看IntegrationNamespaceUtils.configureTransactionAttributes以查看将事务建议(事务属性、属性源、TM等)连接到TransactionInterceptor所需的内容
  2. # 2 楼答案

    我的“丑陋但有效”版本:

    @Bean
    public JpaExecutor jpaExecutor() {
    
        JpaExecutor executor = new JpaExecutor(entityManagerFactory);
        executor.setFlushSize(10);
        executor.setEntityClass(JobOffer.class);
        return executor;
    }
    
    @Bean
    @ServiceActivator(inputChannel="channel5")
    public JpaOutboundGateway save(JpaExecutor jpaExecutor, PlatformTransactionManager transactionManager) throws Exception {
    
        DefaultTransactionAttribute dta = new DefaultTransactionAttribute();
        dta.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
        dta.setReadOnly(false);
    
        MatchAlwaysTransactionAttributeSource matas = new MatchAlwaysTransactionAttributeSource();
        matas.setTransactionAttribute(dta);
    
        TransactionInterceptor ic = new TransactionInterceptor();
        ic.setTransactionManager(transactionManager);
        ic.setTransactionAttributeSource(matas);
    
        List<Advice> adviceChain = new ArrayList<>();
        adviceChain.add(ic);
    
        JpaOutboundGateway gateway = new JpaOutboundGateway(jpaExecutor);
        gateway.setAdviceChain(adviceChain);
        gateway.setProducesReply(false);
    
        return gateway;
    }